package penguin.transfer.netty.server;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import penguin.common.Constants;
import penguin.rpc.RpcClient;
import penguin.transfer.TransferListener;
import penguin.transfer.dto.TransferDTO;
import penguin.transfer.netty.common.NettyCodecAdapter;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created on 15/5/7 下午10:39
 *
 * @author 王建华(penguin83@126.com)
 */
public class NettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);

    private final ChannelHandler idleStateHandler=new IdleStateHandler(new HashedWheelTimer(), Constants.HEART_TIME_OUT,Constants.HEART_TIME_OUT,0);
    private TransferListener transferListener;

    public NettyServer(int port,TransferListener transferListener) {
        this.transferListener = transferListener;
        // Server服务启动器
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));
        final NettyCodecAdapter adapter = new NettyCodecAdapter();
        // 设置一个处理客户端消息和各种消息事件的类(Handler)
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {

                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("idle", idleStateHandler);
                pipeline.addLast("aware",new NettyServerIdleStateAwareChannelHandler());
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("execute",  new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(100, 1048576, 1048576)));
                pipeline.addLast("handler", new MessageServerHandler());

                return pipeline;
            }
        });
        bootstrap.bind(new InetSocketAddress(port));
        LOGGER.debug("netty server started:"+port);
    }

    private  class MessageServerHandler extends SimpleChannelHandler {

       ExecutorService pool= Executors.newCachedThreadPool();


        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            InetSocketAddress inetSocketAddress = (InetSocketAddress)e.getChannel().getRemoteAddress();
            LOGGER.debug("channelConnected["+inetSocketAddress.getAddress().getHostAddress()+":"+inetSocketAddress.getPort()+"]");

            transferListener.connected(inetSocketAddress.getHostName(),inetSocketAddress.getPort());
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            InetSocketAddress inetSocketAddress = (InetSocketAddress)e.getChannel().getRemoteAddress();
            LOGGER.debug("channelConnected["+inetSocketAddress.getAddress().getHostAddress()+":"+inetSocketAddress.getPort()+"]");
            transferListener.disConnected(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
        }

        /**
         * 用户接受客户端发来的消息，在有客户端消息到达时触发
         *
         * @author lihzh
         * @alia OneCoder
         */
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            final TransferDTO param = (TransferDTO) e.getMessage();
            final Channel channel = e.getChannel();

            if("heart".equals(param.getType())){
                channel.write(param);
                return;
            }

            pool.execute(new Runnable() {
                public void run() {
                    transferListener.receiveMessage(channel, param);
                }
            });

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            LOGGER.error("exceptionCaught[" + e.getChannel().getRemoteAddress().toString() + "]", e.getCause());
            e.getChannel().disconnect();
        }
    }


}
